
module Stalking
  class Producer
    def initialize(options = {}, &block)
      @timeout = options[:timeout] || 0.5
      @pri = options[:pri] || 65536
      @delay = options[:delay] || 0
      @ttr = options[:ttr] || 120
      @logger = options[:logger] || Stalking::NullLogger.new
      @servers = options[:servers] || ["localhost:11300"]
      @tries = options[:tries] || [@servers.size, 2].max

      @connections = {}
    end

    def enqueue(name, args = {}, options = {})
      @logger.info "Enqueue #{name.inspect} with #{args.inspect} and #{options.inspect}"

      # Send the job to a random server.

      (@servers.shuffle * @tries).first(@tries).each do |server|
        return true if enqueue_at(server, name, args, options)
      end

      @logger.error "Enqueue #{name.inspect} with #{args.inspect} and #{options.inspect} failed"

      # All servers are currently unavailable.
      # Enqueuing the job has failed.

      false
    end

    private

    def enqueue_at(server, name, args = {}, options = {})
      Timeout::timeout @timeout do
        @connections[server] ||= Beanstalk::Pool.new([server])

        enq @connections[server], name, args, options
      end

      true
    rescue Beanstalk::NotConnected, Timeout::Error, StandardError => e
      @connections[server] = nil # Reset the connection.

      @logger.fatal e

      false
    end

    def enq(connection, name, args = {}, options = {})
      pri = options[:pri] || @pri
      delay = options[:delay] || @delay
      ttr = options[:ttr] || @ttr

      connection.use name
      connection.put [name, args].to_json, pri, delay, ttr
    end
  end
end

